[Apache Hudi]新発表されたGlue3.0でApache Hudiを動かす

[Apache Hudi]新発表されたGlue3.0でApache Hudiを動かす

Clock Icon2021.08.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データ・アナリティクス事業本部の森脇です。

先日、Spark 3.1に対応したGlue3.0が発表されました。

Apache HudiはSpark3系にも対応していますので、早速動作検証をしてみました。

結論

Glue3.0でApache Hudiを使いたい場合は、以下のライブラリを依存ライブラリが必要です。

また、Hive Sync機能も使いたい場合はあわせて以下のライブラリも必要です。

必要なApache Hudiのライブラリ

公式のセットアップガイドを参照し、必要なjarライブラリを確認します。

Spark3でApache Hudi動かす場合、

  • Spark3バンドルのApache Hudi
  • Sparkと同一バージョンのSpark Avro

の2つが必要なようです。

Glue3.0のSparkバージョンは3.1.1なので、hudi-spark3-bundle_2.12-0.8.0.jarspark-avro_2.12-3.1.1.jarをMavenリポジトリからダウンロードし、S3にアップロードしておきます。

Glueジョブの設定

Glueバージョンに3.0が追加されていますので、こちらを選択します。

依存JARパスに、先程S3にアップロードした2つのライブラリパスを指定します。

Hive Sync機能によるGlueデータカタログ連携も試したいので、該当の設定もOnにしておきます。

サンプルコードとして、Copy On Writeテーブルの新規作成処理を利用します。

import sys

from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

databaseName = "default"
tableName = 'hudi_glue3_sample' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'upsert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数

  # データカタログ連携オプション(hive_sync)
  'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
  'hoodie.datasource.hive_sync.database': databaseName, # 連携先のデータベース名
  'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
  'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名
  'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',  # パーティションの方式を指定
  'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。
}

# データの書き込み
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(100))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)

job.commit()

ジョブを実行すると..エラーが発生してしまいました。

Caused by: java.lang.ClassNotFoundException: org.apache.calcite.rel.type.RelDataTypeSystem
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 59 more

調べたところ、Spark3環境でHive連携機能を利用した場合に上記のエラーが発生するようです。

GithubのIsuueに従い、以下のライブラリを追加の依存関係に設定したところ正常にジョブが終了しました。

また、Glueデータカタログに連携したテーブルもAthenaで問題なくクエリできました。

まとめ

Glue3.0でもApache Hudiを利用することが可能でした。

また、ジョブの実行速度、クエリの実行時間が2.0より高速化されているように感じました。

このあたりは別途まとめようと思います。

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.